package cn.mesmile.flink.state;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author zb
 * @date 2021/8/28 17:17
 * @Description
 */
public class FlinkStateApp {

    /**
     * 使用valueState实现maxBy功能，统计分组内订单金额最高的订单
     * @param args
     */
    public static void main(String[] args) throws Exception {
        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> ds = env.socketTextStream("127.0.0.1", 8888);

        DataStream<Tuple3<String, String, Integer>> flatMapDS = ds.flatMap(new RichFlatMapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
                String[] arr = value.split(",");
                out.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2])));
            }
        });

        //一定要key by后才可以使用键控状态ValueState
        SingleOutputStreamOperator<Tuple2<String, Integer>> maxVideoOrder = flatMapDS.keyBy(new KeySelector<Tuple3<String,String,Integer>, String>() {
            @Override
            public String getKey(Tuple3<String, String, Integer> value) throws Exception {
                return value.f0;
            }
        }).map(new RichMapFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>>() {
            private ValueState<Integer> valueState = null;

            @Override
            public void open(Configuration parameters) throws Exception {
                valueState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("total", Integer.class));
            }

            @Override
            public Tuple2<String, Integer> map(Tuple3<String, String, Integer> tuple3) throws Exception {
                // 取出State中的最大值
                Integer stateMaxValue = valueState.value();
                Integer currentValue = tuple3.f2;

                if (stateMaxValue == null || currentValue > stateMaxValue) {
                    //更新状态,把当前的作为新的最大值存到状态中
                    valueState.update(currentValue);
                    return Tuple2.of(tuple3.f0, currentValue);
                } else {
                    //历史值更大
                    return Tuple2.of(tuple3.f0, stateMaxValue);
                }
            }
        });

        maxVideoOrder.print();

        env.execute("valueState job");
    }
}
